{
  "nbformat": 4,
  "nbformat_minor": 0,
  "metadata": {
    "accelerator": "GPU",
    "colab": {
      "name": "23 - Tensor workflows",
      "provenance": [],
      "collapsed_sections": []
    },
    "kernelspec": {
      "display_name": "Python 3",
      "name": "python3"
    }
  },
  "cells": [
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "4Pjmz-RORV8E"
      },
      "source": [
        "# Tensor workflows\n",
        "\n",
        "_This notebook is part of a tutorial series on [txtai](https://github.com/neuml/txtai), an AI-powered semantic search platform._\n",
        "\n",
        "Many of the examples and use cases for txtai focus on transforming text. Makes sense as txt is even in the name! But that doesn't mean txtai only works with text.\n",
        "\n",
        "This notebook will cover examples of how to efficiently process tensors using txtai workflows."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "Dk31rbYjSTYm"
      },
      "source": [
        "# Install dependencies\n",
        "\n",
        "Install `txtai` and all dependencies. We will install the api, pipeline and workflow optional extras packages, along with the datasets package. "
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "id": "XMQuuun2R06J"
      },
      "source": [
        "%%capture\n",
        "!pip install git+https://github.com/neuml/txtai#egg=txtai[api,pipeline,workflow] datasets"
      ],
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "NSYrP0hjtR_E"
      },
      "source": [
        "# Transform large tensor arrays\n",
        "\n",
        "The first section attempts to apply a simple transform to a very large memory-mapped array (2,000,000 x 1024)."
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "id": "BoPJIKWoTibk",
        "colab": {
          "base_uri": "https://localhost:8080/",
          "height": 220
        },
        "outputId": "143a6e4d-fe56-4353-e8ee-595ddfc12249"
      },
      "source": [
        "import numpy as np\n",
        "import torch\n",
        "\n",
        "# Generate large memory-mapped array\n",
        "rows, cols = 2000000, 1024\n",
        "data = np.memmap(\"data.npy\", dtype=np.float32, mode=\"w+\", shape=(rows, cols))\n",
        "del data\n",
        "\n",
        "# Open memory-mapped array\n",
        "data = np.memmap(\"data.npy\", dtype=np.float32, shape=(rows, cols))\n",
        "\n",
        "# Create tensor\n",
        "tensor = torch.from_numpy(data).to(\"cuda:0\")\n",
        "\n",
        "# Apply tanh transform to tensor\n",
        "torch.tanh(tensor).shape"
      ],
      "execution_count": null,
      "outputs": [
        {
          "output_type": "error",
          "ename": "RuntimeError",
          "evalue": "ignored",
          "traceback": [
            "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m",
            "\u001b[0;31mRuntimeError\u001b[0m                              Traceback (most recent call last)",
            "\u001b[0;32m<ipython-input-2-a1fc94fedb69>\u001b[0m in \u001b[0;36m<module>\u001b[0;34m()\u001b[0m\n\u001b[1;32m     14\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m     15\u001b[0m \u001b[0;31m# Apply tanh transform to tensor\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m---> 16\u001b[0;31m \u001b[0mtorch\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mtanh\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mtensor\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mshape\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m",
            "\u001b[0;31mRuntimeError\u001b[0m: CUDA out of memory. Tried to allocate 7.63 GiB (GPU 0; 11.17 GiB total capacity; 7.63 GiB already allocated; 3.04 GiB free; 7.63 GiB reserved in total by PyTorch) If reserved memory is >> allocated memory try setting max_split_size_mb to avoid fragmentation.  See documentation for Memory Management and PYTORCH_CUDA_ALLOC_CONF"
          ]
        }
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "colab": {
          "base_uri": "https://localhost:8080/"
        },
        "id": "O8mKzPP01d_m",
        "outputId": "929226a8-6948-4d17-ab70-025da2081abd"
      },
      "source": [
        "!ls -l --block-size=MB data.npy"
      ],
      "execution_count": null,
      "outputs": [
        {
          "output_type": "stream",
          "name": "stdout",
          "text": [
            "-rw-r--r-- 1 root root 8192MB Dec  6 23:24 data.npy\n"
          ]
        }
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "vuObmAJ9FaJe"
      },
      "source": [
        "Not surprisingly this runs out of CUDA memory. The array needs `2,000,000 * 1024 * 4 = 8GB` which exceeds the amount of GPU memory available.\n",
        "\n",
        "One of the great things about NumPy and PyTorch arrays is that they can be sliced without having to copy data. Additionally, PyTorch has methods to work directly on NumPy arrays without copying data, in other words both NumPy arrays and PyTorch arrays can share the same memory. This opens the door to efficient processing of tensor data in place. \n",
        "\n",
        "Let's try applying a simple tanh transform in batches over the array."
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "colab": {
          "base_uri": "https://localhost:8080/"
        },
        "id": "ciD6unQYD-bJ",
        "outputId": "d3b6a0c5-aea5-451d-d3e3-60d04ef33a9e"
      },
      "source": [
        "def process(x):\n",
        "  print(x.shape)\n",
        "  return torch.tanh(torch.from_numpy(x).to(\"cuda:0\")).cpu().numpy()\n",
        "\n",
        "# Split into 250,000 rows per call\n",
        "batch = 250000\n",
        "count = 0\n",
        "for x in range(0, len(data), batch):\n",
        "  for row in process(data[x : x + batch]):\n",
        "    count += 1\n",
        "\n",
        "print(count)"
      ],
      "execution_count": null,
      "outputs": [
        {
          "output_type": "stream",
          "name": "stdout",
          "text": [
            "(250000, 1024)\n",
            "(250000, 1024)\n",
            "(250000, 1024)\n",
            "(250000, 1024)\n",
            "(250000, 1024)\n",
            "(250000, 1024)\n",
            "(250000, 1024)\n",
            "(250000, 1024)\n",
            "2000000\n"
          ]
        }
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "uZBzEMsRHpsi"
      },
      "source": [
        "Iterating over the data array and selecting slices to operate on allows the transform to complete successfully! Each `torch.from_numpy` call is building a view of a portion the existing large NumPy data array. "
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "Oe7X17vbHJRV"
      },
      "source": [
        "# Enter workflows\n",
        "\n",
        "The next section takes the same array and shows how workflows can apply transformations to tensors. "
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "colab": {
          "base_uri": "https://localhost:8080/"
        },
        "id": "ymqr92kW9hxd",
        "outputId": "e4a4c7d1-be54-46c2-bc7c-dd849adfeb7e"
      },
      "source": [
        "from txtai.workflow import Task, Workflow\n",
        "\n",
        "# Create workflow with a single task calling process for each batch\n",
        "task = Task(process)\n",
        "workflow = Workflow([task], batch)\n",
        "\n",
        "# Run workflow\n",
        "count = 0\n",
        "for row in workflow(data):\n",
        "  count += 1\n",
        "\n",
        "print(count)"
      ],
      "execution_count": null,
      "outputs": [
        {
          "output_type": "stream",
          "name": "stdout",
          "text": [
            "(250000, 1024)\n",
            "(250000, 1024)\n",
            "(250000, 1024)\n",
            "(250000, 1024)\n",
            "(250000, 1024)\n",
            "(250000, 1024)\n",
            "(250000, 1024)\n",
            "(250000, 1024)\n",
            "2000000\n"
          ]
        }
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "B9qC8qUbHjfk"
      },
      "source": [
        "Workflows process the data in the same fashion as the code in the previous section. On top of that, workflows can handle text, images, video, audio, document, tensors and more. Workflow graphs can also be connected together to handle complex use cases."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "wCRD9ERoJvsG"
      },
      "source": [
        "# Workflows with PyTorch models\n",
        "\n",
        "The next example applies a PyTorch model to the same data. The model applies a series of transforms and outputs a single float per row."
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "colab": {
          "base_uri": "https://localhost:8080/"
        },
        "id": "N9UTfSTTIDaO",
        "outputId": "000c7e13-a249-43d0-f419-28ddd62e8ba1"
      },
      "source": [
        "from torch import nn\n",
        "\n",
        "class Model(nn.Module):\n",
        "    def __init__(self):\n",
        "        super().__init__()\n",
        "\n",
        "        self.gelu = nn.ReLU()\n",
        "        self.linear1 = nn.Linear(1024, 512)\n",
        "        self.dropout = nn.Dropout(0.5)\n",
        "        self.norm = nn.LayerNorm(512)\n",
        "        self.linear2 = nn.Linear(512, 1)\n",
        "\n",
        "    def forward(self, inputs):\n",
        "        outputs = self.gelu(inputs)\n",
        "        outputs = self.linear1(outputs)\n",
        "        outputs = self.dropout(outputs)\n",
        "        outputs = self.norm(outputs)\n",
        "        outputs = self.linear2(outputs)\n",
        "\n",
        "        return outputs\n",
        "\n",
        "model = Model().to(\"cuda:0\")\n",
        "\n",
        "def process(x):\n",
        "  with torch.no_grad():\n",
        "    outputs = model(torch.from_numpy(x).to(\"cuda:0\")).cpu().numpy()\n",
        "    print(outputs.shape)\n",
        "    return outputs\n",
        "\n",
        "# Create workflow with a single task calling model for each batch\n",
        "task = Task(process)\n",
        "workflow = Workflow([task], batch)\n",
        "\n",
        "# Run workflow\n",
        "count = 0\n",
        "for row in workflow(data):\n",
        "  count += 1\n",
        "\n",
        "print(count)"
      ],
      "execution_count": null,
      "outputs": [
        {
          "output_type": "stream",
          "name": "stdout",
          "text": [
            "(250000, 1)\n",
            "(250000, 1)\n",
            "(250000, 1)\n",
            "(250000, 1)\n",
            "(250000, 1)\n",
            "(250000, 1)\n",
            "(250000, 1)\n",
            "(250000, 1)\n",
            "2000000\n"
          ]
        }
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "Q1ivX4eBuU8T"
      },
      "source": [
        "Once again the data can be processed in batches using workflows, even with a more complex model. Let's try a more interesting example."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "KoSB1mKzUnb0"
      },
      "source": [
        "# Workflows in parallel\n",
        "\n",
        "Workflows consist of a series of tasks. Each task can output one to many outputs per input element. Multi-output tasks have options available to [merge the data](https://neuml.github.io/txtai/workflow/task/#multi-action-task-merges) for downstream tasks.\n",
        "\n",
        "The following example builds a workflow with a task having three separate actions. Each action takes text as an input an applies a sentiment classifier. This is followed by a task that merges the three outputs for each row using a mean transform. Essentially, this workflow builds a weighted sentiment classifier using the outputs of three models. "
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "id": "JlCdVgo_LXOl"
      },
      "source": [
        "import time\n",
        "\n",
        "from datasets import load_dataset\n",
        "from transformers import AutoTokenizer, AutoModelForSequenceClassification\n",
        "\n",
        "class Tokens:\n",
        "    def __init__(self, texts):\n",
        "        tokenizer = AutoTokenizer.from_pretrained(\"distilbert-base-uncased-finetuned-sst-2-english\")\n",
        "        tokens = tokenizer(texts, padding=True, return_tensors=\"pt\").to(\"cuda:0\")\n",
        "\n",
        "        self.inputs, self.attention = tokens[\"input_ids\"], tokens[\"attention_mask\"]\n",
        "\n",
        "    def __len__(self):\n",
        "        return len(self.inputs)\n",
        "\n",
        "    def __getitem__(self, value):\n",
        "        return (self.inputs[value], self.attention[value])\n",
        "\n",
        "class Classify:\n",
        "    def __init__(self, model):\n",
        "        self.model = model\n",
        "\n",
        "    def __call__(self, tokens):\n",
        "        with torch.no_grad():\n",
        "            inputs, attention = tokens\n",
        "            outputs = self.model(input_ids=inputs, attention_mask=attention)\n",
        "            outputs = outputs[\"logits\"]\n",
        "\n",
        "        return outputs\n",
        "\n",
        "# Load reviews from the rotten tomatoes dataset\n",
        "ds = load_dataset(\"rotten_tomatoes\")\n",
        "texts = ds[\"train\"][\"text\"]\n",
        "\n",
        "tokens = Tokens(texts)\n",
        "\n",
        "model1 = AutoModelForSequenceClassification.from_pretrained(\"M-FAC/bert-tiny-finetuned-sst2\")\n",
        "model1 = model1.to(\"cuda:0\")\n",
        "\n",
        "model2 = AutoModelForSequenceClassification.from_pretrained(\"howey/electra-base-sst2\")\n",
        "model2 = model2.to(\"cuda:0\")\n",
        "\n",
        "model3 = AutoModelForSequenceClassification.from_pretrained(\"philschmid/MiniLM-L6-H384-uncased-sst2\")\n",
        "model3 = model3.to(\"cuda:0\")\n",
        "\n",
        "task1 = Task([Classify(model1), Classify(model2), Classify(model3)])\n",
        "task2 = Task([lambda x: torch.sigmoid(x).mean(axis=1).cpu().numpy()])\n",
        "\n",
        "workflow = Workflow([task1, task2], 250)\n",
        "\n",
        "start = time.time()\n",
        "for x in workflow(tokens):\n",
        "  pass\n",
        "\n",
        "print(f\"Took {time.time() - start} seconds\")"
      ],
      "execution_count": null,
      "outputs": [
        {
          "output_type": "stream",
          "name": "stderr",
          "text": [
            "Using custom data configuration default\n",
            "Reusing dataset rotten_tomatoes_movie_review (/root/.cache/huggingface/datasets/rotten_tomatoes_movie_review/default/1.0.0/40d411e45a6ce3484deed7cc15b82a53dad9a72aafd9f86f8f227134bec5ca46)\n"
          ]
        },
        {
          "output_type": "stream",
          "name": "stdout",
          "text": [
            "Took 84.73194456100464 seconds\n"
          ]
        }
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "eBQzLrtAVUtB"
      },
      "source": [
        "Note that while the task actions are parallel, that doesn't necessarily mean the operations are concurrent. In the case above, the actions are are executed sequentially.\n",
        "\n",
        "Workflows have an additional option to run task actions concurrently. The two supported modes are \"thread\" and \"process\". I/O bound actions will do better with multithreading and CPU bound actions will do better with multiprocessing. More can be read in the [txtai documentation](https://neuml.github.io/txtai/workflow/task/#multi-action-task-concurrency). "
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "colab": {
          "base_uri": "https://localhost:8080/"
        },
        "id": "AB0onoOlVT-e",
        "outputId": "a072d8a6-3b3b-4066-8881-8af9a8b96608"
      },
      "source": [
        "task1 = Task([Classify(model1), Classify(model2), Classify(model3)], concurrency=\"thread\")\n",
        "task2 = Task([lambda x: torch.sigmoid(x).mean(axis=1).cpu().numpy()])\n",
        "\n",
        "workflow = Workflow([task1, task2], 250)\n",
        "\n",
        "start = time.time()\n",
        "for x in workflow(tokens):\n",
        "  pass\n",
        "\n",
        "print(f\"Took {time.time() - start} seconds\")"
      ],
      "execution_count": null,
      "outputs": [
        {
          "output_type": "stream",
          "name": "stdout",
          "text": [
            "Took 85.21102929115295 seconds\n"
          ]
        }
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "5s2KexhG_udx"
      },
      "source": [
        "In this case, concurrency doesn't improve performance. While the [GIL](https://wiki.python.org/moin/GlobalInterpreterLock) is a factor, a bigger factor is that the GPU is already fully loaded. This method would be more beneficial if the system had a second GPU or the primary GPU had idle cycles. "
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "EoQFEi_61P9O"
      },
      "source": [
        "# Wrapping up\n",
        "\n",
        "This notebook introduced a number of different ways to work with large-scale tensor data and process it efficiently. This notebook purposely didn't cover embeddings and pipelines to demonstrate how workflows can stand on their own. In addition to workflows, this notebook covered efficient methods to work with large tensor arrays in PyTorch and NumPy."
      ]
    }
  ]
}
